In [0]:
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

keys_list = list(broadcastStates.value.keys())
filteDf = df.where(df['state'].isin(keys_list))
filteDf.show(truncate=False)


root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-905482920986511>:29[0m
[1;32m     25[0m result[38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)
[1;32m     27[0m [38;5;66;03m# Broadcast variable on filter[39;00m
[0;32m---> 29[0m filteDf[38;5;241m=[39m df[38;5;241m.[39mwhere((df[[38;5;124m'[39m[38;5;124mstate[39m[38;5;124m'[39m][38;5;241m.[39misin(broadcastStates[38;5;241m.[39mvalue)))

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*

In [0]:
#Import the necessary modules from the pyspark.sql and pyspark packages.
#Create a SparkSession named 'SparkByExamples.com'.
#Define a dictionary states that maps state codes to state names.
#Broadcast the states dictionary using spark.sparkContext.broadcast(states). This allows the variable to be efficiently shared across the Spark cluster.
#Define the data as a list of tuples, where each tuple represents a row in the DataFrame.
#Define the column names for the DataFrame.
#Create the DataFrame df using the provided data and schema.
#Print the schema of the DataFrame using the printSchema() method.
#Display the contents of the DataFrame using the show() method.
#Define a function state_convert that takes a state code and returns the corresponding state name using the broadcasted states variable.
#Use df.rdd.map() to apply the state_convert function to each row of the DataFrame, converting the state codes to state names. Then, convert the resulting RDD back to a DataFrame using .toDF(columns).
#Display the contents of the resulting DataFrame using the show() method.
#Use the broadcasted states variable in a filter condition to filter the DataFrame df using df.where((df['state'].isin(broadcastStates.value))). This filters the DataFrame based on the values in the "state" column matching the state codes in the broadcasted dictionary.
#The filtered DataFrame filteDf is assigned the result.
#In summary, this code demonstrates how to use a broadcast variable in PySpark to efficiently share a read-only variable across the Spark cluster. In this case, the broadcasted variable is used to convert state codes to state names and to filter the DataFrame based on the state codes.


In [0]:
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

keys_list = list(broadcastStates.value.keys())
filteDf = df.where(df['state'].isin(keys_list))
filteDf.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA  

In [0]:
#Retrieve the keys from the broadcasted states dictionary using list(broadcastStates.value.keys()).
#Use the retrieved keys to filter the DataFrame df using df.where(df['state'].isin(keys_list)). This filters the DataFrame based on the values in the "state" column matching the keys from the broadcasted dictionary.
#The filtered DataFrame is assigned to filteDf.